package com.lzht.games.room.stream;

import com.alibaba.fastjson.JSONObject;
import com.lzht.games.common.common.NotifyMsg;
import com.lzht.games.common.common.RequestMsg;
import com.lzht.games.common.constant.Results;
import com.lzht.games.common.header.NotifyHeader;
import com.lzht.games.common.header.ReqHeader;
import com.lzht.games.room.cache.RoomCache;
import com.lzht.games.room.common.Topic;
import com.lzht.games.room.model.Address;
import com.lzht.games.room.protocol.*;
import com.lzht.games.room.service.RoomService;
import org.apache.commons.lang3.RandomUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

import java.util.Set;

/**
 * Create by weishuaiding on 2018/11/29 19:01
 */
@EnableBinding(RoomSource.class)
public class ReceiveService {
    private Logger logger = LoggerFactory.getLogger(getClass());
    @Autowired
    private SendService sendService;
    @Autowired
    private DynamicSendService dynamicSendService;
    @Autowired
    private RoomService roomService;

    private static long instanceId = RandomUtils.nextLong();

    @StreamListener(Sink.INPUT)
    public void roomService(NotifyMsg<JSONObject> message) {
        logger.info("房间总消息入口：{}", message);
        message.getHeader().setInstanceId(instanceId);

        JSONObject object = message.getNotifyData();
        ReqHeader header = object.getObject("header", ReqHeader.class);

        if (header.getMsgCode() == 30001) {
            sendService.loginNotify(message);
        } else if (header.getMsgCode() == 30003) {
            sendService.logoutNotify(message);
        } else {
            sendService.messageNotify(message);
        }
    }

    @StreamListener(Topic.INPUT_LOGIN)
    public void loginReceive(NotifyMsg<JSONObject> message) {
        logger.info("收到登录房间请求：{}", message);
        Address address = new Address();
        NotifyHeader header = message.getHeader();

        JSONObject object = message.getNotifyData();
        LoginReq req = object.getObject("reqData", LoginReq.class);
        logger.info("LoginReq: {}", req);

        ReqHeader reqHeader = object.getObject("header", ReqHeader.class);
        logger.info("reqHeader: {}", reqHeader);

        address.setAccessId(header.getAccessId());
        address.setSessionId(header.getSessionId());
        address.setRoomId(req.getRoomId());
        address.setUserId(req.getUserId());

        roomService.addRoomUser(req.getRoomId(), address);

        //返回消息
        if (instanceId == message.getHeader().getInstanceId()) {
            LoginResp resp = new LoginResp();
            dynamicSendService.response(resp, reqHeader.getRequestId(), message);

            notifyRoomUser(req.getRoomId(), req.getUserId(), message);
        }
    }

    @StreamListener(Topic.INPUT_LOGOUT)
    public void logoutReceive(NotifyMsg<JSONObject> message) {
        logger.info("收到退出房间请求：{}", message);

        JSONObject object = message.getNotifyData();
        LogoutReq req = object.getObject("reqData", LogoutReq.class);
        logger.info("LogoutReq: {}", req);

        ReqHeader reqHeader = object.getObject("header", ReqHeader.class);
        logger.info("reqHeader: {}", reqHeader);

        roomService.removeRoomUser(req.getRoomId(), req.getUserId());

        //返回消息
        if (instanceId == message.getHeader().getInstanceId()) {
            LogoutResp resp = new LogoutResp();
            dynamicSendService.response(resp, reqHeader.getRequestId(), message);

            notifyRoomUser(req.getRoomId(), req.getUserId(), message);
        }
    }

    @StreamListener(Topic.INPUT_MESSAGE)
    public void messageReceive(NotifyMsg<JSONObject> message) {
        logger.info("收到房间消息：{}", message);

        JSONObject object = message.getNotifyData();
        SendMsgReq req = object.getObject("reqData", SendMsgReq.class);
        logger.info("SendMsgReq: {}", req);

        ReqHeader reqHeader = object.getObject("header", ReqHeader.class);
        logger.info("reqHeader: {}", reqHeader);

        Set<Address> addresses = roomService.getRoomUsers(req.getRoomId());
        boolean check = checkRoomUser(req.getUserId(), message.getHeader().getSessionId(), addresses);
        SendMsgResp resp = new SendMsgResp();
        if (!check) {
            dynamicSendService.responseError(resp, reqHeader.getRequestId(), message, Results.ERROR_001);
            return;
        }

        if (addresses.size() > 0) {
            for (Address addr : addresses) {
                message.getHeader().setSessionId(addr.getSessionId());
                message.getHeader().setAccessId(addr.getAccessId());

                if (addr.getUserId() == req.getUserId().intValue()
                        && addr.getSessionId().equals(message.getHeader().getSessionId())) {
                    dynamicSendService.response(resp, reqHeader.getRequestId(), message);
                } else {
                    dynamicSendService.send(addr.getAccessId(), message);
                }
            }
        }
    }

    @StreamListener(Topic.INPUT_OFFLINE)
    public void offlineService(String sessionId) {
        logger.info("用户离线 --> sessionId：{}", sessionId);

        Address address = RoomCache.getAddressBySessionId(sessionId);
        if (address == null) {
            return;
        }

        //发退出房间协议
        LogoutReq req = new LogoutReq();
        req.setRoomId(address.getRoomId());
        req.setUserId(address.getUserId());

        RequestMsg reqMsg = new RequestMsg();
        ReqHeader header = new ReqHeader(req);
        header.setRequestId(RandomUtils.nextLong() + "");
        reqMsg.setHeader(header);
        reqMsg.setReqData(req);

        NotifyMsg<JSONObject> notifyMsg = new NotifyMsg<>();
        NotifyHeader notifyHeader = new NotifyHeader();
        notifyMsg.setHeader(notifyHeader);

        JSONObject object = (JSONObject) JSONObject.toJSON(reqMsg);
        notifyMsg.setNotifyData(object);

        notifyRoomUser(req.getRoomId(), req.getUserId(), notifyMsg);
    }

    /**
     * 通知房间成员
     *
     * @param roomId
     * @param senderId
     * @param message
     */
    public void notifyRoomUser(Integer roomId, Integer senderId, NotifyMsg<JSONObject> message) {
        logger.info("通知房间成员：{}", message);
        Set<Address> addresses = roomService.getRoomUsers(roomId);
        if (addresses != null && addresses.size() > 0) {
            for (Address addr : addresses) {
                message.getHeader().setSessionId(addr.getSessionId());
                message.getHeader().setAccessId(addr.getAccessId());

                if (addr.getUserId() != senderId.intValue()) {
                    dynamicSendService.send(addr.getAccessId(), message);
                }
            }
        }
    }

    private boolean checkRoomUser(Integer userId, String sessionId, Set<Address> addressSet) {
        for (Address address : addressSet) {
            if (address.getUserId() == userId.intValue()
                    && address.getSessionId().equals(sessionId)) {
                return true;
            }
        }
        return false;
    }

}
